/*
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
   This file is part of GlusterFS.

   This file is licensed to you under your choice of the GNU Lesser
   General Public License, version 3 or any later version (LGPLv3 or
   later), or the GNU General Public License, version 2 (GPLv2), in all
   cases as published by the Free Software Foundation.
*/

#include "changelog-misc.h"
#include "changelog-mem-types.h"

#include "gf-changelog-helpers.h"
#include "changelog-rpc-common.h"
#include "changelog-lib-messages.h"

#include <glusterfs/syscall.h>

/**
 * Reverse socket: actual data transfer handler. Connection
 * initiator is PROBER, data transfer is REBORP.
 */

static struct rpcsvc_program *gf_changelog_reborp_programs[];

void *
gf_changelog_connection_janitor(void *arg)
{
    int32_t ret = 0;
    xlator_t *this = NULL;
    gf_private_t *priv = NULL;
    gf_changelog_t *entry = NULL;
    struct gf_event *event = NULL;
    struct gf_event_list *ev = NULL;
    unsigned long drained = 0;

    this = arg;
    THIS = this;

    priv = this->private;

    while (1) {
        pthread_mutex_lock(&priv->lock);
        {
            while (list_empty(&priv->cleanups))
                pthread_cond_wait(&priv->cond, &priv->lock);

            entry = list_first_entry(&priv->cleanups, gf_changelog_t, list);
            list_del_init(&entry->list);
        }
        pthread_mutex_unlock(&priv->lock);

        drained = 0;
        ev = &entry->event;

        gf_smsg(this->name, GF_LOG_INFO, 0,
                CHANGELOG_LIB_MSG_CLEANING_BRICK_ENTRY_INFO, "brick=%s",
                entry->brick, NULL);

        /* 0x0: disable rpc-clnt */
        rpc_clnt_disable(RPC_PROBER(entry));

        /* 0x1: cleanup callback invoker thread */
        ret = gf_cleanup_event(this, ev);
        if (ret)
            continue;

        /* 0x2: drain pending events */
        while (!list_empty(&ev->events)) {
            event = list_first_entry(&ev->events, struct gf_event, list);
            gf_smsg(this->name, GF_LOG_INFO, 0,
                    CHANGELOG_LIB_MSG_DRAINING_EVENT_INFO, "seq=%lu",
                    event->seq, "payload=%d", event->count, NULL);

            GF_FREE(event);
            drained++;
        }

        gf_smsg(this->name, GF_LOG_INFO, 0,
                CHANGELOG_LIB_MSG_DRAINED_EVENT_INFO, "num=%lu", drained, NULL);

        /* 0x3: freeup brick entry */
        gf_smsg(this->name, GF_LOG_INFO, 0,
                CHANGELOG_LIB_MSG_FREEING_ENTRY_INFO, "entry=%p", entry, NULL);
        LOCK_DESTROY(&entry->statelock);
        GF_FREE(entry);
    }

    return NULL;
}

int
gf_changelog_reborp_rpcsvc_notify(rpcsvc_t *rpc, void *mydata,
                                  rpcsvc_event_t event, void *data)
{
    int ret = 0;
    xlator_t *this = NULL;
    gf_changelog_t *entry = NULL;

    if (!(event == RPCSVC_EVENT_ACCEPT || event == RPCSVC_EVENT_DISCONNECT))
        return 0;

    entry = mydata;
    this = entry->this;

    switch (event) {
        case RPCSVC_EVENT_ACCEPT:
            ret = sys_unlink(RPC_SOCK(entry));
            if (ret != 0)
                gf_smsg(this->name, GF_LOG_WARNING, errno,
                        CHANGELOG_LIB_MSG_UNLINK_FAILED, "name=reverse socket",
                        "path=%s", RPC_SOCK(entry), NULL);
            if (entry->connected)
                GF_CHANGELOG_INVOKE_CBK(this, entry->connected, entry->brick,
                                        entry->ptr);
            break;
        case RPCSVC_EVENT_DISCONNECT:
            if (entry->disconnected)
                GF_CHANGELOG_INVOKE_CBK(this, entry->disconnected, entry->brick,
                                        entry->ptr);
            /* passthrough */
        default:
            break;
    }

    return 0;
}

rpcsvc_t *
gf_changelog_reborp_init_rpc_listner(xlator_t *this, char *path, char *sock,
                                     void *cbkdata)
{
    CHANGELOG_MAKE_TMP_SOCKET_PATH(path, sock, UNIX_PATH_MAX);
    return changelog_rpc_server_init(this, sock, cbkdata,
                                     gf_changelog_reborp_rpcsvc_notify,
                                     gf_changelog_reborp_programs);
}

/**
 * This is dirty and painful as of now until there is event filtering in the
 * server. The entire event buffer is scanned and interested events are picked,
 * whereas we _should_ be notified with the events we were interested in
 * (selected at the time of probe). As of now this is complete BS and needs
 * fixture ASAP. I just made it work, it needs to be better.
 *
 * @FIXME: cleanup this bugger once server filters events.
 */
void
gf_changelog_invoke_callback(gf_changelog_t *entry, struct iovec **vec,
                             int payloadcnt)
{
    int i = 0;
    int evsize = 0;
    xlator_t *this = NULL;
    changelog_event_t *event = NULL;

    this = entry->this;

    for (; i < payloadcnt; i++) {
        event = (changelog_event_t *)vec[i]->iov_base;
        evsize = vec[i]->iov_len / CHANGELOG_EV_SIZE;

        for (; evsize > 0; evsize--, event++) {
            if (gf_changelog_filter_check(entry, event)) {
                GF_CHANGELOG_INVOKE_CBK(this, entry->callback, entry->brick,
                                        entry->ptr, event);
            }
        }
    }
}

/**
 * Ordered event handler is self-adaptive.. if the event sequence number
 * is what's expected (->next_seq) there is no ordering list that's
 * maintained. On out-of-order event notifications, event buffers are
 * dynamically allocated and ordered.
 */

int
__is_expected_sequence(struct gf_event_list *ev, struct gf_event *event)
{
    return (ev->next_seq == event->seq);
}

int
__can_process_event(struct gf_event_list *ev, struct gf_event **event)
{
    *event = list_first_entry(&ev->events, struct gf_event, list);

    if (__is_expected_sequence(ev, *event)) {
        list_del(&(*event)->list);
        ev->next_seq++;
        return 1;
    }

    return 0;
}

void
pick_event_ordered(struct gf_event_list *ev, struct gf_event **event)
{
    pthread_mutex_lock(&ev->lock);
    {
        while (list_empty(&ev->events) || !__can_process_event(ev, event))
            pthread_cond_wait(&ev->cond, &ev->lock);
    }
    pthread_mutex_unlock(&ev->lock);
}

void
pick_event_unordered(struct gf_event_list *ev, struct gf_event **event)
{
    pthread_mutex_lock(&ev->lock);
    {
        while (list_empty(&ev->events))
            pthread_cond_wait(&ev->cond, &ev->lock);
        *event = list_first_entry(&ev->events, struct gf_event, list);
        list_del(&(*event)->list);
    }
    pthread_mutex_unlock(&ev->lock);
}

void *
gf_changelog_callback_invoker(void *arg)
{
    xlator_t *this = NULL;
    gf_changelog_t *entry = NULL;
    struct iovec *vec = NULL;
    struct gf_event *event = NULL;
    struct gf_event_list *ev = NULL;

    ev = arg;
    entry = ev->entry;
    THIS = this = entry->this;

    while (1) {
        entry->pickevent(ev, &event);

        vec = (struct iovec *)&event->iov;
        gf_changelog_invoke_callback(entry, &vec, event->count);

        GF_FREE(event);
    }

    return NULL;
}

static int
orderfn(struct list_head *pos1, struct list_head *pos2)
{
    struct gf_event *event1 = NULL;
    struct gf_event *event2 = NULL;

    event1 = list_entry(pos1, struct gf_event, list);
    event2 = list_entry(pos2, struct gf_event, list);

    if (event1->seq > event2->seq)
        return 1;
    return -1;
}

void
queue_ordered_event(struct gf_event_list *ev, struct gf_event *event)
{
    /* add event to the ordered event list and wake up listener(s) */
    pthread_mutex_lock(&ev->lock);
    {
        list_add_order(&event->list, &ev->events, orderfn);
        if (!ev->next_seq)
            ev->next_seq = event->seq;
        if (ev->next_seq == event->seq)
            pthread_cond_signal(&ev->cond);
    }
    pthread_mutex_unlock(&ev->lock);
}

void
queue_unordered_event(struct gf_event_list *ev, struct gf_event *event)
{
    /* add event to the tail of the queue and wake up listener(s) */
    pthread_mutex_lock(&ev->lock);
    {
        list_add_tail(&event->list, &ev->events);
        pthread_cond_signal(&ev->cond);
    }
    pthread_mutex_unlock(&ev->lock);
}

int
gf_changelog_event_handler(rpcsvc_request_t *req, xlator_t *this,
                           gf_changelog_t *entry)
{
    int i = 0;
    size_t payloadlen = 0;
    ssize_t len = 0;
    int payloadcnt = 0;
    changelog_event_req rpc_req = {
        0,
    };
    changelog_event_rsp rpc_rsp = {
        0,
    };
    struct iovec *vec = NULL;
    struct gf_event *event = NULL;
    struct gf_event_list *ev = NULL;

    ev = &entry->event;

    len = xdr_to_generic(req->msg[0], &rpc_req,
                         (xdrproc_t)xdr_changelog_event_req);
    if (len < 0) {
        gf_msg(this->name, GF_LOG_ERROR, 0,
               CHANGELOG_LIB_MSG_XDR_DECODING_FAILED, "xdr decoding failed");
        req->rpc_err = GARBAGE_ARGS;
        goto handle_xdr_error;
    }

    if (len < req->msg[0].iov_len) {
        payloadcnt = 1;
        payloadlen = (req->msg[0].iov_len - len);
    }
    for (i = 1; i < req->count; i++) {
        payloadcnt++;
        payloadlen += req->msg[i].iov_len;
    }

    event = GF_CALLOC(1, GF_EVENT_CALLOC_SIZE(payloadcnt, payloadlen),
                      gf_changelog_mt_libgfchangelog_event_t);
    if (!event)
        goto handle_xdr_error;
    INIT_LIST_HEAD(&event->list);

    payloadlen = 0;
    event->seq = rpc_req.seq;
    event->count = payloadcnt;

    /* deep copy IO vectors */
    vec = &event->iov[0];
    GF_EVENT_ASSIGN_IOVEC(vec, event, (req->msg[0].iov_len - len), payloadlen);
    (void)memcpy(vec->iov_base, req->msg[0].iov_base + len, vec->iov_len);

    for (i = 1; i < req->count; i++) {
        vec = &event->iov[i];
        GF_EVENT_ASSIGN_IOVEC(vec, event, req->msg[i].iov_len, payloadlen);
        (void)memcpy(event->iov[i].iov_base, req->msg[i].iov_base,
                     req->msg[i].iov_len);
    }

    gf_msg_debug(this->name, 0,
                 "seq: %" PRIu64 " [%s] (time: %" PRIu64 ".%" PRIu64
                 "), "
                 "(vec: %d, len: %zd)",
                 rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec,
                 payloadcnt, payloadlen);

    /* dispatch event */
    entry->queueevent(ev, event);

    /* ack sequence number */
    rpc_rsp.op_ret = 0;
    rpc_rsp.seq = rpc_req.seq;

    goto submit_rpc;

handle_xdr_error:
    rpc_rsp.op_ret = -1;
    rpc_rsp.seq = 0; /* invalid */
submit_rpc:
    return changelog_rpc_sumbit_reply(req, &rpc_rsp, NULL, 0, NULL,
                                      (xdrproc_t)xdr_changelog_event_rsp);
}

int
gf_changelog_reborp_handle_event(rpcsvc_request_t *req)
{
    xlator_t *this = NULL;
    rpcsvc_t *svc = NULL;
    gf_changelog_t *entry = NULL;

    svc = rpcsvc_request_service(req);
    entry = svc->mydata;

    this = THIS = entry->this;

    return gf_changelog_event_handler(req, this, entry);
}

static rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
    [CHANGELOG_REV_PROC_EVENT] = {"CHANGELOG EVENT HANDLER",
                                  gf_changelog_reborp_handle_event, NULL,
                                  CHANGELOG_REV_PROC_EVENT, DRC_NA, 0},
};

/**
 * Do not use synctask as the RPC layer dereferences ->mydata as THIS.
 * In gf_changelog_setup_rpc(), @cbkdata is of type @gf_changelog_t,
 * and that's required to invoke the callback with the appropriate
 * brick path and it's private data.
 */
static struct rpcsvc_program gf_changelog_reborp_prog = {
    .progname = "LIBGFCHANGELOG REBORP",
    .prognum = CHANGELOG_REV_RPC_PROCNUM,
    .progver = CHANGELOG_REV_RPC_PROCVER,
    .numactors = CHANGELOG_REV_PROC_MAX,
    .actors = gf_changelog_reborp_actors,
    .synctask = _gf_false,
};

static struct rpcsvc_program *gf_changelog_reborp_programs[] = {
    &gf_changelog_reborp_prog,
    NULL,
};
